{
 "cells": [
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## install the streaming twitter jar in the notebook from the Github repo"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "collapsed": false
   },
   "outputs": [],
   "source": [
    "import pixiedust\n",
    "jarPath = \"https://github.com/ibm-watson-data-lab/spark.samples/raw/master/dist/streaming-twitter-assembly-1.6.jar\"\n",
    "pixiedust.installPackage(jarPath)\n",
    "print(\"done\")"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Use Scala Bridge to run the command line version of the app\n",
    "For instruction on how to set up the twitter and Tone Analyzer credentials, please refer to https://developer.ibm.com/clouddataservices/2016/01/15/real-time-sentiment-analysis-of-twitter-hashtags-with-spark/ "
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "collapsed": false
   },
   "outputs": [],
   "source": [
    "%%scala\n",
    "val demo = com.ibm.cds.spark.samples.StreamingTwitter\n",
    "demo.setConfig(\"twitter4j.oauth.consumerKey\",\"XXXX\")\n",
    "demo.setConfig(\"twitter4j.oauth.consumerSecret\",\"XXXX\")\n",
    "demo.setConfig(\"twitter4j.oauth.accessToken\",\"XXXX)\n",
    "demo.setConfig(\"twitter4j.oauth.accessTokenSecret\",\"XXXX\")\n",
    "demo.setConfig(\"watson.tone.url\",\"https://gateway.watsonplatform.net/tone-analyzer/api\")\n",
    "demo.setConfig(\"watson.tone.password\",\"XXXX\")\n",
    "demo.setConfig(\"watson.tone.username\",\"XXXX\")\n",
    "\n",
    "import org.apache.spark.streaming._\n",
    "demo.startTwitterStreaming(sc, Seconds(30))"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "collapsed": false
   },
   "outputs": [],
   "source": [
    "%%scala\n",
    "val demo = com.ibm.cds.spark.samples.StreamingTwitter\n",
    "val (__sqlContext, __df) = demo.createTwitterDataFrames(sc)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Do some data science with the DataFrame __df obtained from the scala code above"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "collapsed": false,
    "pixiedust": {
     "displayParams": {
      "aggregation": "COUNT",
      "handlerId": "barChart",
      "keyFields": "Anger",
      "rowCount": "100",
      "showLegend": "true",
      "stacked": "true",
      "staticFigure": "false",
      "valueFields": "Openness"
     }
    }
   },
   "outputs": [],
   "source": [
    "tweets=__df\n",
    "tweets.count()\n",
    "display(tweets)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "collapsed": false
   },
   "outputs": [],
   "source": [
    "#create an array that will hold the count for each sentiment\n",
    "sentimentDistribution=[0] * 13\n",
    "#For each sentiment, run a sql query that counts the number of tweets for which the sentiment score is greater than 60%\n",
    "#Store the data in the array\n",
    "for i, sentiment in enumerate(tweets.columns[-13:]):\n",
    "    sentimentDistribution[i]=__sqlContext.sql(\"SELECT count(*) as sentCount FROM tweets where \" + sentiment + \" > 60\")\\\n",
    "        .collect()[0].sentCount"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "collapsed": false
   },
   "outputs": [],
   "source": [
    "%matplotlib inline\n",
    "import matplotlib\n",
    "import numpy as np\n",
    "import matplotlib.pyplot as plt\n",
    "\n",
    "ind=np.arange(13)\n",
    "width = 0.35\n",
    "bar = plt.bar(ind, sentimentDistribution, width, color='g', label = \"distributions\")\n",
    "\n",
    "params = plt.gcf()\n",
    "plSize = params.get_size_inches()\n",
    "params.set_size_inches( (plSize[0]*2.5, plSize[1]*2) )\n",
    "plt.ylabel('Tweet count')\n",
    "plt.xlabel('Tone')\n",
    "plt.title('Distribution of tweets by sentiments > 60%')\n",
    "plt.xticks(ind+width, tweets.columns[-13:])\n",
    "plt.legend()\n",
    "\n",
    "plt.show()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "collapsed": true
   },
   "outputs": [],
   "source": [
    "from operator import add\n",
    "import re\n",
    "tagsRDD = tweets.flatMap( lambda t: re.split(\"\\s\", t.text))\\\n",
    "    .filter( lambda word: word.startswith(\"#\") )\\\n",
    "    .map( lambda word : (word, 1 ))\\\n",
    "    .reduceByKey(add, 10).map(lambda (a,b): (b,a)).sortByKey(False).map(lambda (a,b):(b,a))\n",
    "top10tags = tagsRDD.take(10)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "collapsed": false
   },
   "outputs": [],
   "source": [
    "%matplotlib inline\n",
    "import matplotlib\n",
    "import matplotlib.pyplot as plt\n",
    "\n",
    "params = plt.gcf()\n",
    "plSize = params.get_size_inches()\n",
    "params.set_size_inches( (plSize[0]*2, plSize[1]*2) )\n",
    "\n",
    "labels = [i[0] for i in top10tags]\n",
    "sizes = [int(i[1]) for i in top10tags]\n",
    "colors = ['yellowgreen', 'gold', 'lightskyblue', 'lightcoral', \"beige\", \"paleturquoise\", \"pink\", \"lightyellow\", \"coral\"]\n",
    "\n",
    "plt.pie(sizes, labels=labels, colors=colors,autopct='%1.1f%%', shadow=True, startangle=90)\n",
    "\n",
    "plt.axis('equal')\n",
    "plt.show()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "collapsed": true
   },
   "outputs": [],
   "source": [
    "cols = tweets.columns[-13:]\n",
    "def expand( t ):\n",
    "    ret = []\n",
    "    for s in [i[0] for i in top10tags]:\n",
    "        if ( s in t.text ):\n",
    "            for tone in cols:\n",
    "                ret += [s.replace(':','').replace('-','') + u\"-\" + unicode(tone) + \":\" + unicode(getattr(t, tone))]\n",
    "    return ret \n",
    "def makeList(l):\n",
    "    return l if isinstance(l, list) else [l]\n",
    "\n",
    "#Create RDD from tweets dataframe\n",
    "tagsRDD = tweets.map(lambda t: t )\n",
    "\n",
    "#Filter to only keep the entries that are in top10tags\n",
    "tagsRDD = tagsRDD.filter( lambda t: any(s in t.text for s in [i[0] for i in top10tags] ) )\n",
    "\n",
    "#Create a flatMap using the expand function defined above, this will be used to collect all the scores \n",
    "#for a particular tag with the following format: Tag-Tone-ToneScore\n",
    "tagsRDD = tagsRDD.flatMap( expand )\n",
    "\n",
    "#Create a map indexed by Tag-Tone keys \n",
    "tagsRDD = tagsRDD.map( lambda fullTag : (fullTag.split(\":\")[0], float( fullTag.split(\":\")[1]) ))\n",
    "\n",
    "#Call combineByKey to format the data as follow\n",
    "#Key=Tag-Tone\n",
    "#Value=(count, sum_of_all_score_for_this_tone)\n",
    "tagsRDD = tagsRDD.combineByKey((lambda x: (x,1)),\n",
    "                  (lambda x, y: (x[0] + y, x[1] + 1)),\n",
    "                  (lambda x, y: (x[0] + y[0], x[1] + y[1])))\n",
    "\n",
    "#ReIndex the map to have the key be the Tag and value be (Tone, Average_score) tuple\n",
    "#Key=Tag\n",
    "#Value=(Tone, average_score)\n",
    "tagsRDD = tagsRDD.map(lambda (key, ab): (key.split(\"-\")[0], (key.split(\"-\")[1], round(ab[0]/ab[1], 2))))\n",
    "\n",
    "#Reduce the map on the Tag key, value becomes a list of (Tone,average_score) tuples\n",
    "tagsRDD = tagsRDD.reduceByKey( lambda x, y : makeList(x) + makeList(y) )\n",
    "\n",
    "#Sort the (Tone,average_score) tuples alphabetically by Tone\n",
    "tagsRDD = tagsRDD.mapValues( lambda x : sorted(x) )\n",
    "\n",
    "#Format the data as expected by the plotting code in the next cell. \n",
    "#map the Values to a tuple as follow: ([list of tone], [list of average score])\n",
    "#e.g. #someTag:([u'Agreeableness', u'Analytical', u'Anger', u'Cheerfulness', u'Confident', u'Conscientiousness', u'Negative', u'Openness', u'Tentative'], [1.0, 0.0, 0.0, 1.0, 0.0, 0.48, 0.0, 0.02, 0.0])\n",
    "tagsRDD = tagsRDD.mapValues( lambda x : ([elt[0] for elt in x],[elt[1] for elt in x])  )\n",
    "\n",
    "#Use custom sort function to sort the entries by order of appearance in top10tags\n",
    "def customCompare( key ):\n",
    "    for (k,v) in top10tags:\n",
    "        if k == key:\n",
    "            return v\n",
    "    return 0\n",
    "tagsRDD = tagsRDD.sortByKey(ascending=False, numPartitions=None, keyfunc = customCompare)\n",
    "\n",
    "#Take the mean tone scores for the top 10 tags\n",
    "top10tagsMeanScores = tagsRDD.take(10)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "collapsed": false
   },
   "outputs": [],
   "source": [
    "%matplotlib inline\n",
    "import matplotlib\n",
    "import numpy as np\n",
    "import matplotlib.pyplot as plt\n",
    "\n",
    "params = plt.gcf()\n",
    "plSize = params.get_size_inches()\n",
    "params.set_size_inches( (plSize[0]*3, plSize[1]*2) )\n",
    "\n",
    "top5tagsMeanScores = top10tagsMeanScores[:5]\n",
    "width = 0\n",
    "ind=np.arange(13)\n",
    "(a,b) = top5tagsMeanScores[0]\n",
    "labels=b[0]\n",
    "colors = [\"beige\", \"paleturquoise\", \"pink\", \"lightyellow\", \"coral\", \"lightgreen\", \"gainsboro\", \"aquamarine\",\"c\"]\n",
    "idx=0\n",
    "for key, value in top5tagsMeanScores:\n",
    "    plt.bar(ind + width, value[1], 0.15, color=colors[idx], label=key)\n",
    "    width += 0.15\n",
    "    idx += 1\n",
    "plt.xticks(ind+0.3, labels)\n",
    "plt.ylabel('AVERAGE SCORE')\n",
    "plt.xlabel('TONES')\n",
    "plt.title('Breakdown of top hashtags by sentiment tones')\n",
    "\n",
    "plt.legend(bbox_to_anchor=(0., 1.02, 1., .102), loc='center',ncol=5, mode=\"expand\", borderaxespad=0.)\n",
    "\n",
    "plt.show()"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Use Twitter demo embedded app to run the same app with a UI"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "collapsed": false
   },
   "outputs": [],
   "source": [
    "%%scala\n",
    "val demo = com.ibm.cds.spark.samples.PixiedustStreamingTwitter\n",
    "demo.setConfig(\"twitter4j.oauth.consumerKey\",\"XXXX\")\n",
    "demo.setConfig(\"twitter4j.oauth.consumerSecret\",\"XXXX\")\n",
    "demo.setConfig(\"twitter4j.oauth.accessToken\",\"XXXX\")\n",
    "demo.setConfig(\"twitter4j.oauth.accessTokenSecret\",\"XXX\")\n",
    "demo.setConfig(\"watson.tone.url\",\"https://gateway.watsonplatform.net/tone-analyzer/api\")\n",
    "demo.setConfig(\"watson.tone.password\",\"XXXX\")\n",
    "demo.setConfig(\"watson.tone.username\",\"XXXX\")\n",
    "demo.setConfig(\"checkpointDir\", System.getProperty(\"user.home\") + \"/pixiedust/ssc\")"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "collapsed": false,
    "pixiedust": {
     "displayParams": {
      "handlerId": "twitterdemo"
     }
    }
   },
   "outputs": [],
   "source": [
    "from pixiedust_twitterdemo import *\n",
    "twitterDemo()"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## The embedded app has generated a DataFrame called __tweets. Let's use it to do some data science"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "collapsed": false,
    "pixiedust": {
     "displayParams": {
      "handlerId": "dataframe"
     }
    }
   },
   "outputs": [],
   "source": [
    "display(__tweets)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "collapsed": false,
    "pixiedust": {
     "displayParams": {
      "aggregation": "COUNT",
      "handlerId": "barChart",
      "keyFields": "emotion",
      "showLegend": "true",
      "stacked": "true",
      "valueFields": "score"
     }
    }
   },
   "outputs": [],
   "source": [
    "from pyspark.sql import Row\n",
    "from pyspark.sql.types import *\n",
    "emotions=__tweets.columns[-13:]\n",
    "distrib = __tweets.flatMap(lambda t: [(x,t[x]) for x in emotions]).filter(lambda t: t[1]>60)\\\n",
    "    .toDF(StructType([StructField('emotion',StringType()),StructField('score',DoubleType())]))\n",
    "display(distrib)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "collapsed": false
   },
   "outputs": [],
   "source": [
    "__tweets.registerTempTable(\"pixiedust_tweets\")\n",
    "#create an array that will hold the count for each sentiment\n",
    "sentimentDistribution=[0] * 13\n",
    "#For each sentiment, run a sql query that counts the number of tweets for which the sentiment score is greater than 60%\n",
    "#Store the data in the array\n",
    "for i, sentiment in enumerate(__tweets.columns[-13:]):\n",
    "    sentimentDistribution[i]=sqlContext.sql(\"SELECT count(*) as sentCount FROM pixiedust_tweets where \" + sentiment + \" > 60\")\\\n",
    "        .collect()[0].sentCount"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "collapsed": false
   },
   "outputs": [],
   "source": [
    "%matplotlib inline\n",
    "import matplotlib\n",
    "import numpy as np\n",
    "import matplotlib.pyplot as plt\n",
    "\n",
    "ind=np.arange(13)\n",
    "width = 0.35\n",
    "bar = plt.bar(ind, sentimentDistribution, width, color='g', label = \"distributions\")\n",
    "\n",
    "params = plt.gcf()\n",
    "plSize = params.get_size_inches()\n",
    "params.set_size_inches( (plSize[0]*2.5, plSize[1]*2) )\n",
    "plt.ylabel('Tweet count')\n",
    "plt.xlabel('Tone')\n",
    "plt.title('Distribution of tweets by sentiments > 60%')\n",
    "plt.xticks(ind+width, __tweets.columns[-13:])\n",
    "plt.legend()\n",
    "\n",
    "plt.show()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "collapsed": true
   },
   "outputs": [],
   "source": [
    "from operator import add\n",
    "import re\n",
    "tagsRDD = __tweets.flatMap( lambda t: re.split(\"\\s\", t.text))\\\n",
    "    .filter( lambda word: word.startswith(\"#\") )\\\n",
    "    .map( lambda word : (word, 1 ))\\\n",
    "    .reduceByKey(add, 10).map(lambda (a,b): (b,a)).sortByKey(False).map(lambda (a,b):(b,a))\n",
    "top10tags = tagsRDD.take(10)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "collapsed": false
   },
   "outputs": [],
   "source": [
    "%matplotlib inline\n",
    "import matplotlib\n",
    "import matplotlib.pyplot as plt\n",
    "\n",
    "params = plt.gcf()\n",
    "plSize = params.get_size_inches()\n",
    "params.set_size_inches( (plSize[0]*2, plSize[1]*2) )\n",
    "\n",
    "labels = [i[0] for i in top10tags]\n",
    "sizes = [int(i[1]) for i in top10tags]\n",
    "colors = ['yellowgreen', 'gold', 'lightskyblue', 'lightcoral', \"beige\", \"paleturquoise\", \"pink\", \"lightyellow\", \"coral\"]\n",
    "\n",
    "plt.pie(sizes, labels=labels, colors=colors,autopct='%1.1f%%', shadow=True, startangle=90)\n",
    "\n",
    "plt.axis('equal')\n",
    "plt.show()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "collapsed": true
   },
   "outputs": [],
   "source": [
    "cols = __tweets.columns[-13:]\n",
    "def expand( t ):\n",
    "    ret = []\n",
    "    for s in [i[0] for i in top10tags]:\n",
    "        if ( s in t.text ):\n",
    "            for tone in cols:\n",
    "                ret += [s.replace(':','').replace('-','') + u\"-\" + unicode(tone) + \":\" + unicode(getattr(t, tone))]\n",
    "    return ret \n",
    "def makeList(l):\n",
    "    return l if isinstance(l, list) else [l]\n",
    "\n",
    "#Create RDD from tweets dataframe\n",
    "tagsRDD = __tweets.map(lambda t: t )\n",
    "\n",
    "#Filter to only keep the entries that are in top10tags\n",
    "tagsRDD = tagsRDD.filter( lambda t: any(s in t.text for s in [i[0] for i in top10tags] ) )\n",
    "\n",
    "#Create a flatMap using the expand function defined above, this will be used to collect all the scores \n",
    "#for a particular tag with the following format: Tag-Tone-ToneScore\n",
    "tagsRDD = tagsRDD.flatMap( expand )\n",
    "\n",
    "#Create a map indexed by Tag-Tone keys \n",
    "tagsRDD = tagsRDD.map( lambda fullTag : (fullTag.split(\":\")[0], float( fullTag.split(\":\")[1]) ))\n",
    "\n",
    "#Call combineByKey to format the data as follow\n",
    "#Key=Tag-Tone\n",
    "#Value=(count, sum_of_all_score_for_this_tone)\n",
    "tagsRDD = tagsRDD.combineByKey((lambda x: (x,1)),\n",
    "                  (lambda x, y: (x[0] + y, x[1] + 1)),\n",
    "                  (lambda x, y: (x[0] + y[0], x[1] + y[1])))\n",
    "\n",
    "#ReIndex the map to have the key be the Tag and value be (Tone, Average_score) tuple\n",
    "#Key=Tag\n",
    "#Value=(Tone, average_score)\n",
    "tagsRDD = tagsRDD.map(lambda (key, ab): (key.split(\"-\")[0], (key.split(\"-\")[1], round(ab[0]/ab[1], 2))))\n",
    "\n",
    "#Reduce the map on the Tag key, value becomes a list of (Tone,average_score) tuples\n",
    "tagsRDD = tagsRDD.reduceByKey( lambda x, y : makeList(x) + makeList(y) )\n",
    "\n",
    "#Sort the (Tone,average_score) tuples alphabetically by Tone\n",
    "tagsRDD = tagsRDD.mapValues( lambda x : sorted(x) )\n",
    "\n",
    "#Format the data as expected by the plotting code in the next cell. \n",
    "#map the Values to a tuple as follow: ([list of tone], [list of average score])\n",
    "#e.g. #someTag:([u'Agreeableness', u'Analytical', u'Anger', u'Cheerfulness', u'Confident', u'Conscientiousness', u'Negative', u'Openness', u'Tentative'], [1.0, 0.0, 0.0, 1.0, 0.0, 0.48, 0.0, 0.02, 0.0])\n",
    "tagsRDD = tagsRDD.mapValues( lambda x : ([elt[0] for elt in x],[elt[1] for elt in x])  )\n",
    "\n",
    "#Use custom sort function to sort the entries by order of appearance in top10tags\n",
    "def customCompare( key ):\n",
    "    for (k,v) in top10tags:\n",
    "        if k == key:\n",
    "            return v\n",
    "    return 0\n",
    "tagsRDD = tagsRDD.sortByKey(ascending=False, numPartitions=None, keyfunc = customCompare)\n",
    "\n",
    "#Take the mean tone scores for the top 10 tags\n",
    "top10tagsMeanScores = tagsRDD.take(10)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "collapsed": false
   },
   "outputs": [],
   "source": [
    "%matplotlib inline\n",
    "import matplotlib\n",
    "import numpy as np\n",
    "import matplotlib.pyplot as plt\n",
    "\n",
    "params = plt.gcf()\n",
    "plSize = params.get_size_inches()\n",
    "params.set_size_inches( (plSize[0]*3, plSize[1]*2) )\n",
    "\n",
    "top5tagsMeanScores = top10tagsMeanScores[:5]\n",
    "width = 0\n",
    "ind=np.arange(13)\n",
    "(a,b) = top5tagsMeanScores[0]\n",
    "labels=b[0]\n",
    "colors = [\"beige\", \"paleturquoise\", \"pink\", \"lightyellow\", \"coral\", \"lightgreen\", \"gainsboro\", \"aquamarine\",\"c\"]\n",
    "idx=0\n",
    "for key, value in top5tagsMeanScores:\n",
    "    plt.bar(ind + width, value[1], 0.15, color=colors[idx], label=key)\n",
    "    width += 0.15\n",
    "    idx += 1\n",
    "plt.xticks(ind+0.3, labels)\n",
    "plt.ylabel('AVERAGE SCORE')\n",
    "plt.xlabel('TONES')\n",
    "plt.title('Breakdown of top hashtags by sentiment tones')\n",
    "\n",
    "plt.legend(bbox_to_anchor=(0., 1.02, 1., .102), loc='center',ncol=5, mode=\"expand\", borderaxespad=0.)\n",
    "\n",
    "plt.show()"
   ]
  }
 ],
 "metadata": {
  "kernelspec": {
   "display_name": "pySpark (Spark 1.6.0) Python 2",
   "language": "python",
   "name": "pyspark1.6python2"
  },
  "language_info": {
   "codemirror_mode": {
    "name": "ipython",
    "version": 2
   },
   "file_extension": ".py",
   "mimetype": "text/x-python",
   "name": "python",
   "nbconvert_exporter": "python",
   "pygments_lexer": "ipython2",
   "version": "2.7.11"
  }
 },
 "nbformat": 4,
 "nbformat_minor": 0
}
